package com.deep.flink.sqlserver.task;


import com.deep.flink.sqlserver.bean.CommonBean;
import com.deep.flink.sqlserver.config.CollectionConfig;
import com.deep.flink.sqlserver.inter.ProcessDataInterface;
import com.deep.flink.sqlserver.map.TBoaContractDocMap;
import com.deep.flink.sqlserver.sink.SinkMysql;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @ClassName StudentTask
 * @Author deeprado
 * @Version 1.0.0
 * @Description 技术可行性测试代码
 * @Date 2023/5/5 11:14
 */
public class TBoaContractDocTask implements ProcessDataInterface {
    @Override
    public void process(DataStream<CommonBean> commonData) {
        // 数据过滤，只保留student表的数据
        SingleOutputStreamOperator<CommonBean> filterDataStream = commonData.filter(
                new FilterFunction<CommonBean>() {
                    @Override
                    public boolean filter(CommonBean commonBean) throws Exception {
                        return commonBean.getSource().get("table").equals("T_BOA_ContractDoc");
                    }
                }
        );
        // 新建侧边流分支（删除）
        //封装删除
        final OutputTag<CommonBean> deleteOptTBoaContractDoc = new OutputTag<CommonBean>("deleteOptTBoaContractDoc", TypeInformation.of(CommonBean.class));
        //数据分流
        SingleOutputStreamOperator<CommonBean> processData = filterDataStream.process(
                new ProcessFunction<CommonBean, CommonBean>() {
                    @Override
                    public void processElement(CommonBean commonBean, Context context, Collector<CommonBean> collector) throws Exception {
                        if (commonBean.getOp().equals("c") || commonBean.getOp().equals("u") || commonBean.getOp().equals("r")) {
                            //insert or update
                            collector.collect(commonBean);
                        }else {
                            //delete
                            context.output(deleteOptTBoaContractDoc, commonBean);
                        }
                    }
                }
        );

        String upsertSql = "replace into %s value(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
        String deleteSql = "delete from %s where FID=?";

        //insert,update
        processData
                .map(new TBoaContractDocMap())
                .addSink(
                        new SinkMysql(String.format(upsertSql, CollectionConfig.config.getProperty("mysql.t_boa_contract_doc.sql.table")))
                );
        //delete
        processData
                .getSideOutput(deleteOptTBoaContractDoc)
                .map(new TBoaContractDocMap())
                .addSink(
                        new SinkMysql(String.format(deleteSql,CollectionConfig.config.getProperty("mysql.t_boa_contract_doc.sql.table")))
                );
    }
}
